Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stateful Actors #2133

Merged
merged 3 commits into from
Aug 6, 2018
Merged

Stateful Actors #2133

merged 3 commits into from
Aug 6, 2018

Conversation

mrocklin
Copy link
Member

@mrocklin mrocklin commented Jul 19, 2018

This allows Dask to manage remote stateful classes. This has a few advantages:

  1. We can update state quickly without having to engage pure tasks
  2. Actor operations happen between workers without the scheduler, and so have lower latency and don't suffer the same bottlenecks

And a few drawbacks

  1. Makes no attempt at resilience to worker failure
  2. Not fully integrated with the task scheduling framework (passing actor futures to won't give the same semantics as with normal futures, for example)

Fixes #2109

Example

In [1]: from dask.distributed import Client

In [2]: client = Client(processes=False)

In [3]: class Counter:
   ...:     n = 0
   ...:     def __init__(self):
   ...:         self.n = 0
   ...:     def increment(self):
   ...:         self.n += 1
   ...:         return self.n
   ...:     

In [4]: counter = client.submit(Counter, actors=True)

In [5]: counter = counter.result()

In [6]: counter.n
Out[6]: 0

In [7]: counter.increment()
Out[7]: <distributed.actor.ActorFuture at 0x7f31b44d5d68>

In [8]: _.result()
Out[8]: 1

In [9]: counter.n
Out[9]: 1

In [10]: %time counter.increment().result()
CPU times: user 4.06 ms, sys: 21 µs, total: 4.08 ms
Wall time: 3.91 ms
Out[10]: 2

In [11]: %%time
    ...: for i in range(1000):
    ...:     counter.increment()
    ...: 
CPU times: user 566 ms, sys: 40.1 ms, total: 606 ms
Wall time: 585 ms

In [12]: %time counter.n
CPU times: user 5.65 ms, sys: 125 µs, total: 5.77 ms
Wall time: 4.25 ms
Out[12]: 1002

Performance

Current roundtrip latency is around a millisecond

In [14]: %timeit counter.n
3.57 ms ± 727 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

In [15]: client = Client(direct_to_workers=True)

In [16]: counter = client.submit(Counter, actor=True)

In [17]: %timeit counter.n
1.25 ms ± 30.9 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

@mrocklin mrocklin changed the title [WIP] Stateful Actors Stateful Actors Jul 26, 2018
@mrocklin
Copy link
Member Author

I've removed the WIP label. This seems decent enough for me, at least as a first pass. I suspect that we'll change the implementation of ActorFuture eventually, but I think that the user API seems stabilized.

@stsievert
Copy link
Member

I've drafted a parameter server that uses these Actors: https://gist.github.com/stsievert/7135380e1227236bde03a852cae93a37

I have a question: shouldn't client.gather work on actor futures too? Here's a minimal example of what I mean:

In [1]: from distributed import Client

In [2]: class Dummy:
   ...:     def __init__(self, value):
   ...:         self.value = value
   ...:     def get_value(self):
   ...:         return self.value
   ...:

In [3]: client = Client()

In [4]: futures = client.map(Dummy, [1, 2], actor=True)

In [5]: dummies = client.gather(futures)

In [6]: values = [dummy.get_value() for dummy in dummies]

In [7]: values
Out[7]: [<ActorFuture>, <ActorFuture>]

In [8]: client.gather(values)
Out[8]: [<ActorFuture>, <ActorFuture>]

I expected client.gather(values) == [1, 2], as with regular futures:

In [6]: v = [client.submit(lambda x: x, i) for i in range(2)]

In [7]: v
Out[7]:
[<Future: status: finished, type: int, key: <lambda>-e0ea2bfe577f8de576a4698ab710d5ae>,
 <Future: status: finished, type: int, key: <lambda>-982d509e7b5d6f50f0a2ed3fb2838fcf>]

In [8]: client.gather(v)
Out[8]: [0, 1]

@mrocklin
Copy link
Member Author

I have a question: shouldn't client.gather work on actor futures too?

I agree that it would be nice to unify everything and maybe that will happen some day. But that isn't how things work now, and I don't think it's likely to be that way soon. I recommend just using the .result() API

@mrocklin
Copy link
Member Author

@jcrist can I ask for your review on this?

Copy link
Member

@jcrist jcrist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only gave this a cursory review - the actual implementation is a bit opaque to me.

As far as api/docs, I'm a bit confused by the barrier between Future and ActorFuture - when is it ok to mix them and when do they need to be separated?

Additionally, overloading the existing api seems odd to me, specifically in graph-level operations. compute(graph, actors=True) seems to indicate that only the end results are actors, not the intermediate values, but I could also interpret things the other way.

Do you see use cases for returning Actor objects from compute/persist? If not, I'd be inclined to not overload the existing methods, and only support something like client.new_actor(cls, *args, **kwargs), which may help make things clearer (at least for me).


def __dir__(self):
o = set(dir(type(self)))
o.update({attr for attr in dir(self._cls) if not attr.startswith('_')})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for { brackets here

def __getattr__(self, key):
if not hasattr(self._cls, key):
raise AttributeError("%s does not have attribute %s" %
(type(self).__name__, key))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be able to just rely on getattr(self._cls, key) raising this error below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed

workers = kwargs.pop('workers', None)
resources = kwargs.pop('resources', None)
retries = kwargs.pop('retries', None)
priority = kwargs.pop('priority', 0)
fifo_timeout = kwargs.pop('fifo_timeout', '100ms')
allow_other_workers = kwargs.pop('allow_other_workers', False)
actor = kwargs.pop('actor', False)
actors = kwargs.pop('actors', False)
actor = actor or actors
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actor = kwargs.pop('actor', kwargs.pop('actors', False))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why support both actor and actors here? I'd prefer only a single boolean flag between all functions to keep things consistent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So actors would be general purpose, but it seemed a bit odd for submit

client.submit(Foo, actors=True)

It also seemed error prone to have one keyword for submit and one for map, so I just used both in both places. I agree that this is wonky though and am not surprised that it would not survive review. Do you have any suggestions? Use actors= everywhere, including submit?

workers = kwargs.pop('workers', None)
retries = kwargs.pop('retries', None)
resources = kwargs.pop('resources', None)
user_priority = kwargs.pop('priority', 0)
allow_other_workers = kwargs.pop('allow_other_workers', False)
fifo_timeout = kwargs.pop('fifo_timeout', '100ms')
actor = kwargs.pop('actor', False)
actors = kwargs.pop('actors', False)
actor = actor or actors
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actor = kwargs.pop('actor', kwargs.pop('actors', False))

try:
exception = protocol.pickle.loads(exception)
except Exception:
exception = Exception(exception)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you doing this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes exceptions don't come in as serialized bytes, sometimes they come in as just a string of an error message. This is the case when the worker produces a non-serializable Exception (happens sometimes) or when the scheduler needs to return an exception. This came up in testin in this PR. I think that long-term we probably need to have a better structure where we return a message that includes the exception and how the exception is represented. I would prefer not to handle that in this PR though.


for ts in ws.actors:
if ts.state not in {'memory', 'processing'}:
import pdb; pdb.set_trace()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover from debugging.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.

@mrocklin
Copy link
Member Author

Do you see use cases for returning Actor objects from compute/persist? If not, I'd be inclined to not overload the existing methods, and only support something like client.new_actor(cls, *args, **kwargs), which may help make things clearer (at least for me).

I've just added a test that uses actors with compute. Yes, I think that this is a valid use case.

There is also some maintenance cost to adding new future-creating methods like submit/map/compute/persist. Any new option like retries or restrictions ends up being added to all of these. I'm inclined to reuse when possible.

@mrocklin
Copy link
Member Author

As far as api/docs, I'm a bit confused by the barrier between Future and ActorFuture - when is it ok to mix them and when do they need to be separated?

I'll try to add some documentation around this point. Thank you for raising it.

@mrocklin
Copy link
Member Author

When I ran through a benchmark with a pseudo-parameter-server workload I found that I was getting latencies in the 5-10ms range, which seems pretty high. This lead to some work in profiling the scheduler and worker administrative threads (where I suspect most of the blame lies). Hopefully I can get this down in the future.

@mrocklin
Copy link
Member Author

My expectation is that Dask will run somewhere around 1-2ms in the moderate future. We would probably have to look to some more serious changes to get below that (but that's certainly possible as well).

@mrocklin mrocklin merged commit b16ee25 into dask:master Aug 6, 2018
@mrocklin mrocklin deleted the actor branch August 6, 2018 18:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants